{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "flight_perf_file = \"file:///home/shilinlee/workspace/shilinlee/blog/spark_python/dataframe/departuredelays.csv\"\n",
    "airports_file = \"file:///home/shilinlee/workspace/shilinlee/blog/spark_python/dataframe/airport-codes-na.txt\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession.builder.getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 获取机场数据集\n",
    "airports = spark.read.csv(airports_file, header='true', inferSchema=\"true\", sep=\"\\t\")\n",
    "airports.createOrReplaceTempView(\"airports\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 获取起飞延时数据集\n",
    "flight_perf = spark.read.csv(flight_perf_file, header='true')\n",
    "flight_perf.createOrReplaceTempView(\"FlightPerformance\")\n",
    "flight_perf.cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------+--------+\n",
      "|   City|origin|  Delays|\n",
      "+-------+------+--------+\n",
      "|Seattle|   SEA|159086.0|\n",
      "|Spokane|   GEG| 12404.0|\n",
      "|  Pasco|   PSC|   949.0|\n",
      "+-------+------+--------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# 通过城市和起飞代码查询华盛顿州的航班延误总数。这就要求将飞机性能数据和机场数据，通过国际航空运输协会（IATA）将代码关联起来。\n",
    "spark.sql(\"\"\"\n",
    "select a.City, f.origin, sum(f.delay) as Delays \n",
    "from FlightPerformance as f\n",
    "join airports as a on a.IATA = f.origin  \n",
    "where a.State = 'WA'\n",
    "group by a.City, f.origin\n",
    "order by sum(f.delay) desc\n",
    "\"\"\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+---------+\n",
      "|State|   Delays|\n",
      "+-----+---------+\n",
      "|   SC|  80666.0|\n",
      "|   AZ| 401793.0|\n",
      "|   LA| 199136.0|\n",
      "|   MN| 256811.0|\n",
      "|   NJ| 452791.0|\n",
      "|   OR| 109333.0|\n",
      "|   VA|  98016.0|\n",
      "| null| 397237.0|\n",
      "|   RI|  30760.0|\n",
      "|   WY|  15365.0|\n",
      "|   KY|  61156.0|\n",
      "|   NH|  20474.0|\n",
      "|   MI| 366486.0|\n",
      "|   NV| 474208.0|\n",
      "|   WI| 152311.0|\n",
      "|   ID|  22932.0|\n",
      "|   CA|1891919.0|\n",
      "|   CT|  54662.0|\n",
      "|   NE|  59376.0|\n",
      "|   MT|  19271.0|\n",
      "+-----+---------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# 查询US按照州分组的延误航班总数\n",
    "spark.sql(\"\"\"\n",
    "select a.State, sum(f.delay) as Delays \n",
    "from FlightPerformance as f\n",
    "join airports as a on a.IATA = f.origin  \n",
    "where a.Country = 'USA'\n",
    "group by a.State\n",
    "\"\"\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**For more information, please refer to:**\n",
    "- [Spark SQL, DataFrames and Datasets Guide](http://spark.apache.org/docs/latest/sql-programming-guide.html#sql)\n",
    "- [PySpark SQL Module: DataFrame](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)\n",
    "- [PySpark SQL Functions Module](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Parsed Logical Plan ==\n",
      "'Aggregate ['a.State], ['a.State, 'sum('f.delay) AS Delays#156]\n",
      "+- 'Filter ('a.Country = USA)\n",
      "   +- 'Join Inner, ('a.IATA = 'f.origin)\n",
      "      :- 'SubqueryAlias `f`\n",
      "      :  +- 'UnresolvedRelation `FlightPerformance`\n",
      "      +- 'SubqueryAlias `a`\n",
      "         +- 'UnresolvedRelation `airports`\n",
      "\n",
      "== Analyzed Logical Plan ==\n",
      "State: string, Delays: double\n",
      "Aggregate [State#11], [State#11, sum(cast(delay#29 as double)) AS Delays#156]\n",
      "+- Filter (Country#12 = USA)\n",
      "   +- Join Inner, (IATA#13 = origin#31)\n",
      "      :- SubqueryAlias `f`\n",
      "      :  +- SubqueryAlias `flightperformance`\n",
      "      :     +- Relation[date#28,delay#29,distance#30,origin#31,destination#32] csv\n",
      "      +- SubqueryAlias `a`\n",
      "         +- SubqueryAlias `airports`\n",
      "            +- Relation[City#10,State#11,Country#12,IATA#13] csv\n",
      "\n",
      "== Optimized Logical Plan ==\n",
      "Aggregate [State#11], [State#11, sum(cast(delay#29 as double)) AS Delays#156]\n",
      "+- Project [delay#29, State#11]\n",
      "   +- Join Inner, (IATA#13 = origin#31)\n",
      "      :- Project [delay#29, origin#31]\n",
      "      :  +- Filter isnotnull(origin#31)\n",
      "      :     +- InMemoryRelation [date#28, delay#29, distance#30, origin#31, destination#32], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
      "      :           +- *(1) FileScan csv [date#28,delay#29,distance#30,origin#31,destination#32] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/shilinlee/workspace/shilinlee/blog/spark_python/dataframe/departured..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<date:string,delay:string,distance:string,origin:string,destination:string>\n",
      "      +- Project [State#11, IATA#13]\n",
      "         +- Filter ((isnotnull(Country#12) && (Country#12 = USA)) && isnotnull(IATA#13))\n",
      "            +- Relation[City#10,State#11,Country#12,IATA#13] csv\n",
      "\n",
      "== Physical Plan ==\n",
      "*(3) HashAggregate(keys=[State#11], functions=[sum(cast(delay#29 as double))], output=[State#11, Delays#156])\n",
      "+- Exchange hashpartitioning(State#11, 200)\n",
      "   +- *(2) HashAggregate(keys=[State#11], functions=[partial_sum(cast(delay#29 as double))], output=[State#11, sum#186])\n",
      "      +- *(2) Project [delay#29, State#11]\n",
      "         +- *(2) BroadcastHashJoin [origin#31], [IATA#13], Inner, BuildRight\n",
      "            :- *(2) Filter isnotnull(origin#31)\n",
      "            :  +- InMemoryTableScan [delay#29, origin#31], [isnotnull(origin#31)]\n",
      "            :        +- InMemoryRelation [date#28, delay#29, distance#30, origin#31, destination#32], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
      "            :              +- *(1) FileScan csv [date#28,delay#29,distance#30,origin#31,destination#32] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/shilinlee/workspace/shilinlee/blog/spark_python/dataframe/departured..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<date:string,delay:string,distance:string,origin:string,destination:string>\n",
      "            +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))\n",
      "               +- *(1) Project [State#11, IATA#13]\n",
      "                  +- *(1) Filter ((isnotnull(Country#12) && (Country#12 = USA)) && isnotnull(IATA#13))\n",
      "                     +- *(1) FileScan csv [State#11,Country#12,IATA#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/shilinlee/workspace/shilinlee/blog/spark_python/dataframe/airport-co..., PartitionFilters: [], PushedFilters: [IsNotNull(Country), EqualTo(Country,USA), IsNotNull(IATA)], ReadSchema: struct<State:string,Country:string,IATA:string>\n"
     ]
    }
   ],
   "source": [
    "# Prints the (logical and physical) plans to the console for debugging purpose.\n",
    "\n",
    "# Parameters\n",
    "# extended – boolean, default False. If False, prints only the physical plan.\n",
    "spark.sql(\"\"\"\n",
    "select a.State, sum(f.delay) as Delays \n",
    "from FlightPerformance as f\n",
    "join airports as a on a.IATA = f.origin  \n",
    "where a.Country = 'USA'\n",
    "group by a.State\n",
    "\"\"\").explain(True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
